clean up avatar example and add retry for datastream io rpc call#2943
clean up avatar example and add retry for datastream io rpc call#2943
Conversation
| """ | ||
| Send a request to the avatar service for it to join the room | ||
|
|
||
| This function should be wrapped in a avatar plugin. | ||
| """ | ||
|
|
||
| # read the agent identity from the token | ||
| try: | ||
| decoded = ctx.decode_token() | ||
| agent_identity = decoded["sub"] | ||
| except (RuntimeError, KeyError): | ||
| if not ctx.room.isconnected(): | ||
| raise RuntimeError( | ||
| "local participant identity not found in token, and room is not connected" | ||
| ) from None | ||
| agent_identity = ctx.room.local_participant.identity | ||
|
|
There was a problem hiding this comment.
I think we should provide a different API for the token.
Something like ctx.token_details(), it doesn't check any signature and return the typed api.VideoGrants, wdyt?
There was a problem hiding this comment.
There was a problem hiding this comment.
maybe an api.Claims? then read the local participant identity from claims.identity
There was a problem hiding this comment.
Oh yeah sry, api.Claims lgtm!
There was a problem hiding this comment.
This way we don't even need to catch for errors. It's safe to assume the token content is always OK (we ignore validation)
livekit-agents/livekit/agents/job.py
Outdated
| return jwt.decode(self._info.token, api_secret, options=options, algorithms=["HS256"]) # type: ignore | ||
|
|
||
| def token_details(self) -> Claims: | ||
| claims = self.decode_token() |
There was a problem hiding this comment.
IMO we should make decode_token private.
livekit-agents/livekit/agents/job.py
Outdated
| if video_dict := claims.get("video"): | ||
| video_dict = {camel_to_snake(k): v for k, v in video_dict.items()} | ||
| video_dict = { | ||
| k: v for k, v in video_dict.items() if k in VideoGrants.__dataclass_fields__ | ||
| } | ||
| claims["video"] = VideoGrants(**video_dict) | ||
|
|
||
| if sip_dict := claims.get("sip"): | ||
| sip_dict = {camel_to_snake(k): v for k, v in sip_dict.items()} | ||
| sip_dict = {k: v for k, v in sip_dict.items() if k in SIPGrants.__dataclass_fields__} | ||
| claims["sip"] = SIPGrants(**sip_dict) | ||
|
|
||
| if room_config := claims.get("roomConfig"): | ||
| claims["roomConfig"] = ParseDict( | ||
| room_config, RoomConfiguration(), ignore_unknown_fields=True | ||
| ) |
There was a problem hiding this comment.
I'm thinking that we could add a new argument to the TokenVerifier in livekit-api to skip verifications. So we don't duplicate code
There was a problem hiding this comment.
created a pr for python sdk livekit/python-sdks#473
|
needs livekit/python-sdks#473 |
| raise | ||
| retry_count += 1 | ||
| logger.warning("failed to notify the agent playback finished, retrying...") | ||
| await asyncio.sleep(0.01) |
livekit-agents/livekit/agents/job.py
Outdated
| options["verify_signature"] = False | ||
| api_secret = "" | ||
| return jwt.decode(self._info.token, api_secret, options=options, algorithms=["HS256"]) # type: ignore | ||
| def token_details(self) -> Claims: |
There was a problem hiding this comment.
| def token_details(self) -> Claims: | |
| def token_claims(self) -> Claims: |
Uh oh!
There was an error while loading. Please reload this page.